复杂系统设计概述#
Claude Code 可以帮助开发者设计和实现复杂的软件系统。通过系统化的设计方法和分阶段的实现策略,开发者可以利用 Claude Code 构建可扩展、可维护的复杂系统。
系统架构设计#
1. 微服务架构#
示例:设计微服务架构
用户请求:
"设计一个电商系统的微服务架构"
Claude Code 生成的架构设计:
系统架构#
│ API Gateway │
│ (负载均衡、认证、路由、限流、监控) │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌───────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ User Service │ │ Product Service │ │ Order Service │
│ - 用户管理 │ │ - 商品管理 │ │ - 订单管理 │
│ - 认证授权 │ │ - 库存管理 │ │ - 支付处理 │
│ - 个人资料 │ │ - 分类管理 │ │ - 物流跟踪 │
└────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└─────────────────────┼─────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌───────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ Payment Service│ │ Inventory │ │ Notification │
│ - 支付处理 │ │ Service │ │ Service │
│ - 退款处理 │ │ - 库存同步 │ │ - 邮件通知 │
│ - 交易记录 │ │ - 预警管理 │ │ - 短信通知 │
└────────────────┘ └─────────────────┘ └─────────────────┘
│
┌─────────▼─────────┐
│ Message Broker │
│ (Kafka/RabbitMQ) │
└───────────────────┘
```
## 服务定义
### User Service (用户服务)
```python
python
# services/user_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, create_access_token
import bcrypt
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users_db'
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
db = SQLAlchemy(app)
jwt = JWTManager(app)
class User(db.Model):
"""用户模型"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(100), nullable=False)
phone = db.Column(db.String(20))
created_at = db.Column(db.DateTime, server_default=db.func.now())
def set_password(self, password):
"""设置密码"""
self.password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
def check_password(self, password):
"""验证密码"""
return bcrypt.checkpw(
password.encode('utf-8'),
self.password_hash.encode('utf-8')
)
@app.route('/register', methods=['POST'])
def register():
"""用户注册"""
data = request.get_json()
user = User(
email=data['email'],
name=data['name'],
phone=data.get('phone')
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({'user_id': user.id}), 201
@app.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
user = User.query.filter_by(email=data['email']).first()
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
access_token = create_access_token(identity=user.id)
return jsonify({
'access_token': access_token,
'user': {
'id': user.id,
'email': user.email,
'name': user.name
}
})
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""获取用户信息"""
user = User.query.get_or_404(user_id)
return jsonify({
'id': user.id,
'email': user.email,
'name': user.name,
'phone': user.phone
})
### Product Service (商品服务)
```python
# services/product_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/products_db'
db = SQLAlchemy(app)
migrate = Migrate(app, db)
class Product(db.Model):
"""商品模型"""
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, default=0)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
category = db.relationship('Category', backref='products')
class Category(db.Model):
"""分类模型"""
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
parent = db.relationship('Category', remote_side=[id], backref='children')
@app.route('/products', methods=['POST'])
def create_product():
"""创建商品"""
data = request.get_json()
product = Product(
name=data['name'],
description=data.get('description'),
price=data['price'],
stock=data.get('stock', 0),
category_id=data.get('category_id')
)
db.session.add(product)
db.session.commit()
return jsonify({'product_id': product.id}), 201
@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
"""获取商品详情"""
product = Product.query.get_or_404(product_id)
return jsonify({
'id': product.id,
'name': product.name,
'description': product.description,
'price': product.price,
'stock': product.stock,
'category': product.category.name if product.category else None
})
@app.route('/products/<int:product_id>/stock', methods=['PUT'])
def update_stock(product_id):
"""更新库存"""
data = request.get_json()
product = Product.query.get_or_404(product_id)
product.stock = data['stock']
db.session.commit()
return jsonify({'stock': product.stock})
### Order Service (订单服务)
```python
python
# services/order_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import requests
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'
db = SQLAlchemy(app)
class Order(db.Model):
"""订单模型"""
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=False)
total_amount = db.Column(db.Float, nullable=False)
status = db.Column(db.String(20), default='pending')
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
class OrderItem(db.Model):
"""订单项模型"""
__tablename__ = 'order_items'
id = db.Column(db.Integer, primary_key=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, nullable=False)
quantity = db.Column(db.Integer, nullable=False)
price = db.Column(db.Float, nullable=False)
order = db.relationship('Order', backref='items')
@app.route('/orders', methods=['POST'])
def create_order():
"""创建订单"""
data = request.get_json()
# 验证商品库存
for item in data['items']:
response = requests.get(
f"http://product-service:5001/products/{item['product_id']}"
)
product = response.json()
if product['stock'] < item['quantity']:
return jsonify({
'error': f'Insufficient stock for product {item["product_id"]}'
}), 400
# 创建订单
order = Order(
user_id=data['user_id'],
total_amount=0
)
db.session.add(order)
db.session.flush()
# 创建订单项
total_amount = 0
for item in data['items']:
order_item = OrderItem(
order_id=order.id,
product_id=item['product_id'],
quantity=item['quantity'],
price=item['price']
)
db.session.add(order_item)
total_amount += item['price'] * item['quantity']
order.total_amount = total_amount
db.session.commit()
# 扣减库存
for item in data['items']:
response = requests.get(
f"http://product-service:5001/products/{item['product_id']}"
)
product = response.json()
new_stock = product['stock'] - item['quantity']
requests.put(
f"http://product-service:5001/products/{item['product_id']}/stock",
json={'stock': new_stock}
)
return jsonify({'order_id': order.id}), 201
@app.route('/orders/<int:order_id>', methods=['GET'])
def get_order(order_id):
"""获取订单详情"""
order = Order.query.get_or_404(order_id)
return jsonify({
'id': order.id,
'user_id': order.user_id,
'total_amount': order.total_amount,
'status': order.status,
'items': [
{
'product_id': item.product_id,
'quantity': item.quantity,
'price': item.price
}
for item in order.items
]
})
## API Gateway 实现
```python
python
# api_gateway/app.py
from flask import Flask, request, jsonify
import requests
import jwt
from functools import wraps
app = Flask(__name__)
SERVICES = {
'user': 'http://user-service:5000',
'product': 'http://product-service:5001',
'order': 'http://order-service:5002'
}
def authenticate(f):
"""认证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Missing token'}), 401
try:
payload = jwt.decode(token.split()[1], 'your-secret-key', algorithms=['HS256'])
request.user_id = payload['user_id']
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(service, path):
"""代理请求到对应的服务"""
if service not in SERVICES:
return jsonify({'error': 'Service not found'}), 404
service_url = SERVICES[service]
url = f"{service_url}/{path}"
headers = {key: value for key, value in request.headers if key != 'Host'}
response = requests.request(
method=request.method,
url=url,
headers=headers,
json=request.get_json(),
params=request.args
)
return jsonify(response.json()), response.status_code
@app.route('/health')
def health():
"""健康检查"""
return jsonify({'status': 'healthy'})
```> >
### 2. 事件驱动架构
````python
`python
# 示例:设计事件驱动架构
用户请求:
"设计一个基于事件驱动的订单处理系统"
Claude Code 生成的架构设计:
## 系统架构
```> ┌──────────────┐
│ 订单服务 │
│ (Order │
│ Service) │
└──────┬───────┘
│ 发布事件
▼
┌─────────────────────────────────────────────────────────────┐
│ Message Broker │
│ (Kafka/RabbitMQ) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 订单创建 │ │ 订单支付 │ │ 订单发货 │ │ 订单完成 │ │
│ │ 事件 │ │ 事件 │ │ 事件 │ │ 事件 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 库存服务 │ │ 支付服务 │ │ 物流服务 │ │ 通知服务 │
│ Inventory│ │ Payment │ │ Shipping │ │ Notify │
│ Service │ │ Service │ │ Service │ │ Service │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
```
## 事件定义
# events/order_events.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
import json
@dataclass
class OrderEvent:
"""订单事件基类"""
event_type: str
order_id: int
timestamp: datetime
data: Dict[str, Any]
def to_dict(self) -> dict:
"""转换为字典"""
return {
'event_type': self.event_type,
'order_id': self.order_id,
'timestamp': self.timestamp.isoformat(),
'data': self.data
}
def to_json(self) -> str:
"""转换为 JSON"""
return json.dumps(self.to_dict())
@dataclass
class OrderCreatedEvent(OrderEvent):
"""订单创建事件"""
def __init__(self, order_id: int, user_id: int, total_amount: float, items: list):
super().init(
event_type='order.created',
order_id=order_id,
timestamp=datetime.utcnow(),
data={
'user_id': user_id,
'total_amount': total_amount,
'items': items
}
)
@dataclass
class OrderPaidEvent(OrderEvent):
"""订单支付事件"""
def init(self, order_id: int, payment_method: str, transaction_id: str):
super().init(
event_type='order.paid',
order_id=order_id,
timestamp=datetime.utcnow(),
data={
'payment_method': payment_method,
'transaction_id': transaction_id
}
)
@dataclass
class OrderShippedEvent(OrderEvent):
"""订单发货事件"""
def init(self, order_id: int, tracking_number: str, shipping_address: dict):
super().init(
event_type='order.shipped',
order_id=order_id,
timestamp=datetime.utcnow(),
data={
'tracking_number': tracking_number,
'shipping_address': shipping_address
}
)
@dataclass
class OrderCompletedEvent(OrderEvent):
"""订单完成事件"""
def init(self, order_id: int, delivery_time: datetime):
super().init(
event_type='order.completed',
order_id=order_id,
timestamp=datetime.utcnow(),
data={
'delivery_time': delivery_time.isoformat()
}
)
python
services/order_service/event_publisher.py
import pika
import json
from events.order_events import OrderEvent
class EventPublisher:
"""事件发布者"""
事件消费者#
services/inventory_service/event_consumer.py
import pika
import json
import requests
class InventoryEventConsumer:
"""库存事件消费者"""
def init(self, rabbitmq_url: str = 'amqp://localhost'):
self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
self.channel = self.connection.channel()
声明交换机和队列
self.channel.exchange_declare(
exchange='order_events',
exchange_type='topic'
)
self.channel.queue_declare(queue='inventory_queue', durable=True)
self.channel.queue_bind(
exchange='order_events',
queue='inventory_queue',
routing_key='order.created'
)
def handle_order_created(self, ch, method, properties, body):
"""处理订单创建事件"""
event = json.loads(body)
print(f"Received order.created event: {event}")
扣减库存
for item in event['data']['items']:
response = requests.get(
f"http://product-service:5001/products/{item['product_id']}"
)
product = response.json()
new_stock = product['stock'] - item['quantity']
requests.put(
f"http://product-service:5001/products/{item['product_id']}/stock",
json={'stock': new_stock}
)
确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consuming(self):
"""开始消费消息"""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='inventory_queue',
on_message_callback=self.handle_order_created
)
print('Inventory service started consuming messages...')
self.channel.start_consuming()
def close(self):
"""关闭连接"""
self.connection.close()
python
services/order_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from event_publisher import EventPublisher
from events.order_events import OrderCreatedEvent
app = Flask(name)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'
db = SQLAlchemy(app)
event_publisher = EventPublisher()
class Order(db.Model):
"""订单模型"""
tablename = 'orders'
class OrderItem(db.Model):
"""订单项模型"""
tablename = 'order_items'
@app.route('/orders', methods=['POST'])
def create_order():
"""创建订单"""
data = request.get_json()
if name == 'main':
app.run(host='0.0.0.0', port=5002)